package com.daoshu.mti.kafkaclient.handle;

import com.alibaba.fastjson.JSON;
import com.daoshu.mti.kafkaclient.bean.TKmTrailEntity;
import com.daoshu.mti.kafkaclient.constant.Topic;
import com.daoshu.mti.kafkaclient.consumer.TKmTrailSetConsumer;
import com.daoshu.mti.kafkaclient.service.TKmTrailSetService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Classname TKmTrailSetHandle
 * @Description TODO
 * @Date 2020/1/17 21:15
 * @Created by duchaof
 * @Version 1.0
 */
@Slf4j
@Component
public class TKmTrailSetHandle extends TKmTrailSetConsumer {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private TKmTrailSetService tKmTrailSetservice;


    @Override
    @Transactional(rollbackFor = Exception.class)
    protected void onDealMessage(String message) {
        log.info("接收到topic：" + Topic.WARN + "的消息" + ",线程：" + Thread.currentThread().getName() + "在" + sdf.format(new Date()) + "开始处理接受到的消息：" + message);
        TKmTrailEntity entity = JSON.parseObject(message, TKmTrailEntity.class);
        Integer res = null;
        try {
            Boolean repeatFlag = tKmTrailSetservice.judgeRepeatData(entity.getId());
            log.info("数据检查是否重复[TKmTrailSetHandle]" + repeatFlag);
            if (repeatFlag) {
                return;
            } else {
                //把以前数据值为0  isnew
                tKmTrailSetservice.update(entity.getSetId());
                entity.setIsRead("0");
                res = tKmTrailSetservice.insert(entity);
                log.info("数据入库[TKmTrailSetHandle]" + res);
            }
        } catch (Exception e) {
            log.error("插入时发生异常：==========》{}", e.getMessage());
        }
    }
}
